Multimodal LiDAR-Camera Sensor Fusion Algorithm for Road Longitudinal Slope Estimation¶
I utilized the outputs of pretrained vision models (semantic segmentation and YOLO) and developed a pipeline involving geometric calibration, 3D-to-2D projection, and DBSCAN based clustering to precisely extract road-surface points. Traditional methods suffered from high error rates due to noise from roadside objects—such as guardrails, trees, and vehicles-being mistakenly included during image-LiDAR alignment. To address this, I developed a DBSCAN based road surface extraction algorithm that effectively removes such noise, ultimately reducing longitudinal slope estimation error by approximately 30%.
Data Availability¶
The raw data used in this project is corporate confidential data and cannot be publicly shared. Therefore, this repository contains only the analysis code (ipynb) without the original dataset. For security reasons, all sensitive information and paths have been removed.
import open3d as o3d
import pandas as pd
import numpy as np
import os, json, warnings, math
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
# Set file paths
pcd_dir = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/테스트베드/lidar'
mask_dir = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/테스트베드/sementic_1'
origin = np.array([0.0, 0.0, -2.27], dtype=np.float32) # Set vehicle origin coordinates
road_rgb = np.array([128, 64, 128]) # Road color (RGB threshold)
BASE_POINT = np.array([0.0, 0.0, -2.27], dtype=np.float32)
Define preprocessing functions¶
# Import functions required for preprocessing
# 1. Define PCD reader (xyz + intensity)
def read_pcd_xyz_intensity(path):
pcd_t = o3d.t.io.read_point_cloud(path)
xyz = pcd_t.point['positions'].numpy().astype(np.float32)
if 'intensity' in pcd_t.point:
intensity = pcd_t.point['intensity'].numpy().astype(np.float32).flatten()
else:
raise ValueError("intensity 필드가 존재하지 않습니다.")
return xyz, intensity
# 2. Calibration utilities
def quat_to_R(w, x, y, z):
return np.array([
[1-2*(y*y+z*z), 2*(x*y - z*w), 2*(x*z + y*w)],
[2*(x*y + z*w), 1-2*(x*x+z*z), 2*(y*z - x*w)],
[2*(x*z - y*w), 2*(y*z + x*w), 1-2*(x*x+y*y)]
], dtype=np.float32)
def make_tf(ext):
T = np.eye(4, dtype=np.float32)
T[:3,:3] = quat_to_R(ext["w"], ext["x"], ext["y"], ext["z"])
T[:3,3] = [ext["tx"], ext["ty"], ext["tz"]]
return T
# 3. Load calibration parameters
calib_path = '/Users/jiyeonhwang/Downloads/서울대 교통계획연구실/도도/도도 종단경사/지연 test 파일/Calib.json'
with open(calib_path, 'r') as f:
calib = json.load(f)
intr = calib["01_camera"]["3_intrinsic"]
extr = calib["01_camera"]["4_extrinsic"]
K = np.array([
[intr["fx"], intr.get("skew",0.0), intr["cx"]],
[0, intr["fy"], intr["cy"]],
[0, 0, 1 ]
], dtype=np.float32)
dist = np.array([
intr["k1"], intr["k2"], intr["p1"], intr["p2"], intr.get("k3", 0.0)
], dtype=np.float32)
tf = make_tf(extr)
# 4. Define file paths and ROI
ROI_X = (-20.0, 20.0)
ROI_Y = (-100.0, 0.0)
ROI_Z = (-20.0, 20.0)
road_rgb = np.array([128, 64, 128])
# # trimming 비율
# TRIM_RATIO_X = 0.10
# TRIM_RATIO_Z = 0.05
# # trim 함수
# def trim_extremes(idxs, vals, ratio):
# N = len(idxs)
# k = int(math.floor(ratio * N))
# if N < 2*k+1:
# return idxs
# order = np.argsort(vals[idxs])
# return idxs[order[k:N-k]]
# Create a list of files to process
file_list = sorted([f for f in os.listdir(pcd_dir) if f.endswith('pcd')])
Projection + Alignment Preprocessing Loop¶
from tqdm import tqdm
data_dict = {} # Save projection and alignment results in data_dict
for file_name in tqdm(file_list):
try:
pcd_path = os.path.join(pcd_dir, file_name)
mask_path = os.path.join(mask_dir, file_name.replace('.pcd', '_mask_rgb.png'))
# Load PCD (xyz + intensity)
xyz_all, intensity_all = read_pcd_xyz_intensity(pcd_path)
# Camera alignment (projection)
hom_all = np.hstack([xyz_all, np.ones((len(xyz_all), 1), np.float32)])
cam_all = (tf @ hom_all.T).T[:, :3]
mask_f = cam_all[:, 2] > 0
xyz_all, cam_all = xyz_all[mask_f], cam_all[mask_f]
intensity_all = intensity_all[mask_f]
# Project to 2D image coordinates
px, _ = cv2.projectPoints(cam_all.astype(np.float32),
np.zeros((3,1)), np.zeros((3,1)),
K, dist)
uv = px.reshape(-1, 2).astype(int)
seg_bgr = cv2.imread(mask_path)
seg_rgb = cv2.cvtColor(seg_bgr, cv2.COLOR_BGR2RGB)
H, W = seg_rgb.shape[:2]
valid = (
(0 <= uv[:,0]) & (uv[:,0] < W) &
(0 <= uv[:,1]) & (uv[:,1] < H)
)
seg_col = np.zeros((len(uv), 3), np.uint8)
seg_col[valid] = seg_rgb[uv[valid,1], uv[valid,0]]
# ROI and road color filtering
roi_mask = (
(ROI_X[0] <= xyz_all[:,0]) & (xyz_all[:,0] <= ROI_X[1]) &
(ROI_Y[0] <= xyz_all[:,1]) & (xyz_all[:,1] <= ROI_Y[1]) &
(ROI_Z[0] <= xyz_all[:,2]) & (xyz_all[:,2] <= ROI_Z[1])
)
sem_road = np.all(np.abs(seg_col.astype(int) - road_rgb) <= 20, axis=1)
candidate_idx = np.where(roi_mask & sem_road)[0]
road_pts = xyz_all[candidate_idx]
intensity_roi = intensity_all[candidate_idx]
# # trimming
# idx_x = trim_extremes(np.arange(len(road_pts)), road_pts[:,0], TRIM_RATIO_X)
# idx_xz = trim_extremes(idx_x, road_pts[:,2], TRIM_RATIO_Z)
# final_idx = candidate_idx[idx_xz]
# trimmed_out = np.setdiff1d(candidate_idx, final_idx) # 선택되지 않은 인덱스들 -> trim으로 없어진 영역
# # trim 된거
# road_pts = xyz_all[final_idx]
# intensity_roi = intensity_all[final_idx]
if len(road_pts) == 0:
print(f"{file_name} : 도로 점 없음")
continue
# Extract the farthest road point
local_far_idx = np.argmin(road_pts[:,1])
global_far_idx = candidate_idx[local_far_idx]
far_pt = road_pts[local_far_idx]
data_dict[file_name] = { # Store preprocessing results in dictionary
"xyz_all": xyz_all,
"intensity_all": intensity_all,
"road_pts": road_pts,
"intensity_roi": intensity_roi,
"final_idx": candidate_idx,
"global_far_idx": global_far_idx,
"far_pt": far_pt
}
except Exception as e:
print(f"[에러] {file_name}: {e}")
0%| | 0/100 [00:00<?, ?it/s]/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/1114291487.py:24: RuntimeWarning: invalid value encountered in cast uv = px.reshape(-1, 2).astype(int) 100%|██████████| 100/100 [00:33<00:00, 3.00it/s]
Loop for generating normalized features for DBSCAN¶
# Normalize features only
features_dict = {} # Dictionary to store normalized features
for i in file_list:
# Combine (N,3) road points with (N,1) intensity -> (N,4)
features = np.hstack([data_dict[i]['road_pts'], data_dict[i]['intensity_roi'].reshape(-1, 1)])
# Normalize features (x, y, z, intensity have different scales)
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
features_dict[i] = features_scaled
# Apply weights to the normalized features
features_w_dict = {} # Dictionary to store weighted features
weights = np.array([0.5, 0.5, 4.0, 1.0])
for i in file_list:
features_weighted = features_dict[i] * weights
features_w_dict[i] = features_weighted
DBSCAN 1: Using normalized features¶
# eps: distance threshold, min_samples: minimum number of points
db = DBSCAN(eps=0.3, min_samples=10)
labels = db.fit_predict(features_dict[file_list[3]])
unique_labels = np.unique(labels)
colors = np.zeros((len(labels), 3))
colormap = plt.cm.get_cmap('Set3', len(unique_labels))
for idx, label in enumerate(unique_labels):
if label == -1:
colors[labels == label] = [0.5, 0.5, 0.5] # noise points -> gray
else:
colors[labels == label] = colormap(idx)[:3]
# Visualization
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(data_dict[file_list[3]]['road_pts'])
pcd.colors = o3d.utility.Vector3dVector(colors)
o3d.visualization.draw_geometries([pcd])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/521401732.py:9: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
colormap = plt.cm.get_cmap('Set3', len(unique_labels))
Visualize DBSCAN clustering results together with the full point cloud¶
target_file = file_list[3]
db = DBSCAN(eps=0.3, min_samples=10)
labels = db.fit_predict(features_dict[target_file])
unique_labels = np.unique(labels)
xyz_all = data_dict[target_file]['xyz_all']
candidate_idx = data_dict[target_file]['final_idx']
colors_full = np.ones((len(xyz_all), 3)) * 0.5
colormap = plt.cm.get_cmap('Set3', len(unique_labels))
for idx, label in enumerate(unique_labels):
if label == -1:
continue
cluster_color = np.array(colormap(idx)[:3])
local_mask = (labels == label)
global_idx = candidate_idx[local_mask]
colors_full[global_idx] = cluster_color
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz_all)
pcd.colors = o3d.utility.Vector3dVector(colors_full)
o3d.visualization.draw_geometries([pcd])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/3966926208.py:14: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
colormap = plt.cm.get_cmap('Set3', len(unique_labels))
DBSCAN 2: Using normalized and weighted features¶
test_file = file_list[3]
db_w = DBSCAN(eps=0.2, min_samples=10)
labels_w = db_w.fit_predict(features_w_dict[test_file])
unique_labels_w = np.unique(labels_w)
colors_w = np.zeros((len(labels_w), 3))
colormap_w = plt.cm.get_cmap('Set3', len(unique_labels_w))
for idx_w, label_w in enumerate(unique_labels_w):
if label_w == -1:
colors_w[labels_w == label_w] = [0.5, 0.5, 0.5]
else:
colors_w[labels_w == label_w] = colormap_w(idx_w)[:3]
pcd_w = o3d.geometry.PointCloud()
pcd_w.points = o3d.utility.Vector3dVector(data_dict[test_file]['road_pts'])
pcd_w.colors = o3d.utility.Vector3dVector(colors_w)
o3d.visualization.draw_geometries([pcd_w])
/var/folders/57/dv1c_0xn3xs1tzzxdvgp0pth0000gn/T/ipykernel_7874/505877789.py:9: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.
colormap_w = plt.cm.get_cmap('Set3', len(unique_labels_w))
Compute longitudinal slope after DBSCAN-based noise removal¶
from collections import Counter
# Select the largest cluster (excluding -1 which represents noise)
label_counter = Counter(labels_w)
label_counter.pop(-1, None) # remove noise label
road_label = label_counter.most_common(1)[0][0] # label with the most points
# Extract only the main road cluster
main_road = labels_w == road_label
main_road_pts = data_dict[test_file]['road_pts'][main_road]
test_far_idx = np.argmin(data_dict[test_file]['road_pts'][:,1])
test_far_idx
# global_far_idx = candidate_idx[local_far_idx]
# far_pt = road_pts[local_far_idx]
np.int64(4989)
data_dict[test_file]['road_pts'][:,1]
array([-11.738706 , -11.199067 , -12.355909 , ..., -6.6102333,
-7.166094 , -6.880293 ], dtype=float32)
test_far_pts = data_dict[test_file]['road_pts'][test_far_idx]
test_far_pts
array([ 2.9195569, -99.54187 , -2.7990465], dtype=float32)
test_far_pts[:2]
array([ 2.9195569, -99.54187 ], dtype=float32)
BASE_POINT[2]
np.float32(-2.27)
horiz = np.linalg.norm(test_far_pts[:2] - BASE_POINT[:2])
vert = test_far_pts[2] - BASE_POINT[2]
slope_ratio = (vert / horiz)*100 if horiz != 0 else 0.0
dist = np.linalg.norm(far_pt - BASE_POINT)
print(horiz, vert, slope_ratio, dist)
99.58467 -0.52904654 -0.531253 100.59322
The Kernel crashed while executing code in the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click <a href='https://aka.ms/vscodeJupyterKernelCrash'>here</a> for more info. View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details.
from collections import Counter
import pandas as pd
BASE_POINT = np.array([0.0, 0.0, -2.27], dtype=np.float32)
results = []
for fname in file_list:
try:
road_pts = data_dict[fname]['road_pts']
intensity = data_dict[fname]['intensity_roi']
if len(road_pts) < 10:
continue # Skip files with too few road points
# Normalize + apply feature weights
features = np.hstack([road_pts, intensity.reshape(-1,1)])
features = StandardScaler().fit_transform(features)
weights = np.array([0.5, 0.5, 4.0, 1.0])
features *= weights
# DBSCAN clustering
labels = DBSCAN(eps=0.2, min_samples=10).fit_predict(features)
# Identify the largest cluster (excluding noise)
label_counter = Counter(labels)
label_counter.pop(-1, None) # remove noise (-1)
if not label_counter:
continue
road_label = label_counter.most_common(1)[0][0]
main_mask = labels == road_label
main_road_pts = road_pts[main_mask]
# Extract the farthest point along the longitudinal axis (min y)
far_idx = np.argmin(main_road_pts[:,1])
far_pt = main_road_pts[far_idx]
# Compute horizontal distance, vertical rise, slope (%), and 3D distance
horiz = np.linalg.norm(far_pt[:2] - BASE_POINT[:2])
vert = far_pt[2] - BASE_POINT[2]
slope = (vert / horiz) * 100 if horiz != 0 else 0.0
dist = np.linalg.norm(far_pt - BASE_POINT)
results.append({
'file': fname,
'slope_percent(%)': slope,
'distance_m': dist
})
except Exception as e:
print(f"[에러] {fname}: {e}")
df_result = pd.DataFrame(results)
df_result
| file | slope_percent(%) | distance_m | |
|---|---|---|---|
| 0 | 364800.099993.pcd | -0.122479 | 71.837372 |
| 1 | 364802.100011.pcd | -0.318304 | 66.746185 |
| 2 | 364803.700000.pcd | -0.012193 | 69.414627 |
| 3 | 364804.999997.pcd | -0.307521 | 59.249557 |
| 4 | 364806.099978.pcd | -0.498715 | 58.696812 |
| ... | ... | ... | ... |
| 95 | 367390.900000.pcd | -0.301984 | 55.855045 |
| 96 | 367391.600000.pcd | -0.213143 | 54.660049 |
| 97 | 367392.299998.pcd | -0.886404 | 58.215660 |
| 98 | 367393.000014.pcd | -0.601244 | 60.295650 |
| 99 | 367393.699993.pcd | -0.675602 | 61.510811 |
100 rows × 3 columns